/*
 * PRedisBase.cpp
 *
 *  Created on: Oct 11, 2013
 *      Author: yuliang
 */

#include "PRedisBase.h"
#include "PLog.h"
#include "PIOStreamOperator.h"

#define REDIS_STATUS_SUCCESS	"OK"

#define REDIS_KEY_DEL		"del"

#define REDIS_KEY_EXPIRE	"expire"
#define REDIS_KEY_EXPIREAT	"expireat"

#define REDIS_STR_ADD		"set"
#define REDIS_STR_GET		"get"
#define REDIS_STR_INCRBY	"incrby"

#define REDIS_HASH_ADD		"hset"
#define REDIS_HASH_MADD		"hmset"
#define REDIS_HASH_GET		"hget"
#define REDIS_HASH_MGET		"hmget"
#define REDIS_HASH_DEL		"hdel"
#define REDIS_HASH_INCRBY	"hincrby"
#define REDIS_HASH_ALL		"hgetall"

#define REDIS_SET_ADD		"sadd"				// 新增
#define REDIS_SET_DEL		"srem"				// 删除
#define REDIS_SET_EXIST		"sismember"			// 存在
#define REDIS_SET_INTER		"sinter"			// 获取/交集
#define REDIS_SET_CARD		"scard"				// 个数
//#define RSETMEMBERS "smembers"
//#define RHASHSET "hset"
//#define RHASHMSET "hmset"
//#define RHASHDEL "hdel"
//#define RHASHGET "hget"
//#define RDELKEY "del"
//#define REXPIREKEY "expire"
//#define RSETUNION "sunion"
//#define RHASHMGET "hmget"

#define REDIS_RECOUNT 2

CPRedisBase::CPRedisBase() :
	m_redisContext(NULL) {
	// TODO Auto-generated constructor stub
	memset(&m_redisConfig, 0x00, sizeof(sRedisConfig));
}

CPRedisBase::~CPRedisBase() {
	// TODO Auto-generated destructor stub
	Disconnect();
}

/*
 * @brief       连接redis
 * @auther      lyl
 * @param [in]  config : redis config
 * @return      bool   : true表示成功, false表示失败
 */
bool CPRedisBase::TimeConnect(const sRedisConfig* config) {
	if (!config) {
		MLOG_ERROR(LOG_REDIS, "Param error!");
		return false;
	}
	if (m_redisContext) {
		MLOG_WARN(LOG_REDIS, "Redis already connected!");
		return true;
	}
	bool result = false;
	do {
		MLOG_TRACE(LOG_REDIS,
				"Connect|" << config->host << "|" << config->port << "|" << config->db);
		struct timeval out_;
		out_.tv_sec = config->timeout;
		out_.tv_usec = 0;

		m_redisContext = redisConnectWithTimeout(config->host, config->port, out_);
		if (m_redisContext->err) {
			MLOG_ERROR(LOG_REDIS,
					"failed|connect|"<< m_redisContext->err <<"|" << m_redisContext->errstr <<
					"|" << config->host << "|" << config->port << "|" << config->db);
			break;
		}
		if (!SelectDB(config->db)) {
			break;
		}
		memcpy(&m_redisConfig, config, sizeof(sRedisConfig));
		result = true;
	} while (0);

	if (!result) {
		if (m_redisContext) {
			redisFree(m_redisContext);
		}
		m_redisContext = NULL;
	}

	return result;
}

/*
 * @brief       断开redis
 * @auther      lyl
 * @return      void
 */
void CPRedisBase::Disconnect() {
	if (NULL != m_redisContext) {
		MLOG_TRACE(LOG_REDIS, "Disconnect redis");
		redisFree(m_redisContext);
	}
	m_redisContext = NULL;
}

/*
 * @brief       重连redis
 * @auther      lyl
 * @return      bool      : true表示成功, false表示失败
 */
bool CPRedisBase::Reconnect() {
	Disconnect();
	return TimeConnect(&m_redisConfig);
}

/*
 * @brief       选择redis的数据源
 * @auther      lyl
 * @param [in]  redisdb : db of redis
 * @return      bool    : true表示成功, false表示失败
 */
bool CPRedisBase::SelectDB(const char* redisdb) {
	if (NULL == m_redisContext) {
		return false;
	}

	redisReply* reply = (redisReply*) redisCommand(m_redisContext, "SELECT %s", redisdb);
	if (!reply) {
		return false;
	}
	int type = reply->type;
	freeReplyObject(reply);

	return REDIS_REPLY_ERROR != type;
}

int32_t CPRedisBase::DelKey(const std::string& key) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_KEY_DEL << " " << key;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER)
				tResult = reply->integer;
			else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::OutTime(const std::string& key, uint32_t sec) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_KEY_EXPIRE << " " << key << " " << sec;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER)
				tResult = reply->integer;
			else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::StringAdd(const std::string& key, const std::string& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_STR_ADD << " " << key << " " << value;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_STATUS/* && strcmp(reply->str, REDIS_STATUS_SUCCESS)*/)
				tResult = 1;
			else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::StringGet(const std::string& key, std::string& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_STR_GET << " " << key;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_STRING) {
				value.append(reply->str);
				tResult = 1;
			} else if (reply->type == REDIS_REPLY_NIL) {
				tResult = 0;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::StringIncrby(const std::string& key, int32_t value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_STR_INCRBY << " " << key << " " << value;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER) {
				tResult = reply->integer;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::HashAdd(const std::string& key, const std::string& field, const std::string& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_HASH_ADD << " " << key << " " << field << " " << value;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER) {
				tResult = reply->integer;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::HashMAdd(const std::string& key, const VecStr_t& field, const VecStr_t& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	if (field.size() != value.size()) {
		MLOG_ERROR(LOG_REDIS, "error|hash|madd|size|" << field.size() << "|" << value.size());
		return RedisError;
	}

	m_Query.clear();
	m_Query << REDIS_HASH_MADD << " " << key;
	for (size_t i = 0; i != field.size(); ++i)
		m_Query  << " " << field[i] << " " << value[i];
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_STATUS) {
				tResult = 1;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::HashGet(const std::string& key, const std::string& field, std::string& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_HASH_GET << " " << key << " " << field;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_STRING) {
				value.append(reply->str);
				tResult = 1;
			} else if (reply->type == REDIS_REPLY_NIL) {
				tResult = 0;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::HashMGet(const std::string& key, const VecStr_t& field, VecStr_t& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_HASH_MGET << " " << key;
	for (size_t i = 0; i != field.size(); ++i)
		m_Query << " " << field[i];
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_ARRAY) {
				decodeArray(reply, value);
				tResult = value.size();
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::HashGetAll(const std::string& key, VecStr_t& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_HASH_ALL << " " << key;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_ARRAY) {
				decodeArray(reply, value);
				tResult = value.size();
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::HashDel(const std::string& key, const std::string& field) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_HASH_DEL << " " << key << " " << field;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER) {
				tResult = reply->integer;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::HashMDel(const std::string& key, const VecStr_t& field) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_HASH_DEL << " " << key;
	for (size_t i = 0; i != field.size(); ++i)
		m_Query << " " << field[i];
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER) {
				tResult = reply->integer;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::HashIncrby(const std::string& key, const std::string& field, int32_t value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_HASH_INCRBY << " " << key << " " << field << " " << value;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER) {
				tResult = reply->integer;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::SetAdd(const std::string& key, const std::string& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_SET_ADD << " " << key << " " << value;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER) {
				tResult = reply->integer;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::SetMAdd(const std::string& key, const VecStr_t& value) {
	std::string tValue;
	for (size_t i = 0; i != value.size(); ++i) {
		if (0 != i)
			tValue.append(" ");
		tValue.append(value[i]);
	}
	return SetAdd(key, tValue);
}

int32_t CPRedisBase::SetExist(const std::string& key, const std::string& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_SET_EXIST << " " << key << " " << value;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER) {
				tResult = reply->integer;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::SetDel(const std::string& key, const std::string& value) {
	if (!checkInit()) {
		MLOG_ERROR(LOG_REDIS, "error|reconnect");
		return RedisError;
	}
	m_Query.clear();
	m_Query << REDIS_SET_DEL << " " << key << " " << value;
	MLOG_DEBUG(LOG_REDIS, "db|" << m_redisConfig.db << "|cmd|" << m_Query);

	int32_t tResult = RedisError;
	int32_t tCount = REDIS_RECOUNT;
	do {
		redisReply* reply = (redisReply*) redisCommand(m_redisContext, m_Query.c_str());
		if (NULL == reply) {
			if (!Reconnect()) {
				MLOG_ERROR(LOG_REDIS, "error|reconnect");
				break;
			}
		} else {
			if (reply->type == REDIS_REPLY_INTEGER) {
				tResult = reply->integer;
			} else {
				MLOG_ERROR(LOG_REDIS, "none|type|" << reply->type);
			}

			freeReplyObject(reply);
			break;
		}
	} while (--tCount);

	MLOG_DEBUG(LOG_REDIS, "result|" << tResult);
	return tResult;
}

int32_t CPRedisBase::SetMDel(const std::string& key, const VecStr_t& value) {
	std::string tValue;
	for (size_t i = 0; i != value.size(); ++i) {
		if (0 != i)
			tValue.append(" ");
		tValue.append(value[i]);
	}
	return SetDel(key, tValue);
}

void CPRedisBase::Flushall() {
	if (NULL == m_redisContext)
		return;
	redisReply* reply = (redisReply*) redisCommand(m_redisContext, "flushall");
	if (!reply)
		return;

	freeReplyObject(reply);
}

void CPRedisBase::FlushDB() {
	if (NULL == m_redisContext)
		return;
	redisReply* reply = (redisReply*) redisCommand(m_redisContext, "flushdb");
	if (!reply)
		return;

	freeReplyObject(reply);
}


